{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "159663ea-ba3f-44a8-aee4-d0de3d43d13a",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "# DataSet介绍\n",
    "\n",
    "DataSet是Spark1.6引入的分布式数据集合，Spark2.0合并DataSet和DataFrame数据集合API，DataFrame变成DataSet的子集，DataFrame=DataSet[Row]。\n",
    "\n",
    "DataSet是一个强类型，并且类型安全的数据容器，并且提供了结构化查询API和类似RDD一样的命令式API 。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "1f62bb3d-b944-4f07-846f-4034505e6e14",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "# DataSet的创建"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "2014fc59-92b7-4266-a990-b20b1d7eac91",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 通过序列创建DataSet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "a60197fc-edec-4404-903e-dbae8772b438",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "val ds = spark.range(1, 10, 2)\n",
    "\n",
    "ds.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "0e1b6452-4ba2-4346-b42c-a6c350bf6784",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "val ds = spark.createDataset(0 to 10)\n",
    "\n",
    "ds.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "2a5aabe4-d7c8-40f5-a0e4-b9f9f66cd272",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 通过集合创建DataSet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "7a72d043-7865-4369-88e8-23bab826c618",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "case class Person(name: String, age: Int)\n",
    "\n",
    "val ds = spark.createDataset(Seq(Person(\"wux\",10),Person(\"labs\",20)))\n",
    "\n",
    "ds.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "32fbfd41-2c79-4b28-a1d6-489d09c482af",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 通过RDD创建DataSet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "cfacad0b-5570-424a-9017-aba6ea6a8182",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "val rdd = sc.parallelize(List(1,2,3,4,5), 4)\n",
    "\n",
    "val ds = rdd.toDS\n",
    "\n",
    "ds.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "a324fb73-551b-41af-84c2-153c781cdcbe",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "val rdd = sc.parallelize(List(Person(\"wux\",10),Person(\"labs\",20)), 4)\n",
    "\n",
    "val ds = rdd.toDS\n",
    "\n",
    "ds.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "5fa078ec-f7e4-45e5-b42b-4b388a0624bf",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 通过DataFrame创建DataSet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "f3909737-a80e-42e8-bb1b-fb004cc5e1c3",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "case class Order (\n",
    "val OrderNumber: String,\n",
    "val OrderDate: String,\n",
    "val ItemName: String,\n",
    "val Quantity: Integer,\n",
    "val ProductPrice: Double,\n",
    "val TotalProducts: Integer\n",
    ")\n",
    "\n",
    "val df = spark.read.parquet(\"/mnt/databrickscontainer1/restaurant-1-orders.parquet\")\n",
    "\n",
    "val ds = df.as[Order]\n",
    "\n",
    "ds.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "69ff568a-b2cb-4cc2-a481-e7ba1e0517b5",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "# DataSet的底层\n",
    "\n",
    "DataSet 最底层处理的是对象的序列化形式，通过查看 DataSet 生成的物理执行计划，也就是最终处理的RDD，就可以判定 DataSet 底层处理的是什么形式的数据。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "d90638b2-a17f-4844-835a-783c478d3d52",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "val rdd = ds.queryExecution.toRdd"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "aa351042-b918-4ba5-92fa-d5c8623792f6",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "dataset.queryExecution.toRdd 这个 API 可以看到 DataSet 底层执行的 RDD，这个 RDD 中的范型是 catalyst.InternalRow ，InternalRow 又称为 Catalyst Row ，是 DataSet 底层的数据结构，也就是说，无论 DataSet 的范型是什么，无论是 DataSet[Order] 还是其它的，其最底层进行处理的数据结构都是 InternalRow 。\n",
    "\n",
    "所以， DataSet 的范型对象在执行之前，需要通过 Encoder 转换为 InternalRow，在输入之前，需要把 InternalRow 通过 Decoder 转换为范型对象。\n",
    "\n",
    "* DataSet 是一个 Spark 组件，其底层还是 RDD 。\n",
    "* DataSet 提供了访问对象中某个字段的能力，不用像 RDD 一样每次都要针对整个对象做操作。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "4389724a-74c8-4aa3-947d-001f2bfa4681",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "# DataSet与DataFrame"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "491f0ce7-7169-4019-8ab3-e43137480779",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 相同的地方"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "7f4d7165-7d3d-4e1f-b31c-f4f1e6b31e11",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### 相同的DSL访问"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "b07d5734-5dd4-4a1a-a58d-1b25104c669e",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "df.printSchema()\n",
    "\n",
    "ds.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "17dd36ac-2d1e-4803-8ad6-08fd77aa03e9",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "df.describe().show()\n",
    "\n",
    "ds.describe().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "2dca0e30-ee28-42e0-a59e-46bb6b029a6f",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "df.select(\"OrderNumber\",\"ItemName\").show()\n",
    "\n",
    "ds.select(\"OrderNumber\",\"ItemName\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "1af1b234-ea2c-4adc-915b-49d51d7bb0b4",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "df.filter(\"OrderNumber = 16117\").show()\n",
    "\n",
    "ds.filter(\"OrderNumber = 16118\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "3ac717b6-bab9-4dd4-8768-6d72ac080a4e",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "df.sort(df(\"ProductPrice\")).show()\n",
    "\n",
    "ds.sort(ds(\"ProductPrice\").desc).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "57a00eef-82c3-43a4-8c6d-acf1acff3a5d",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### 相同的SQL访问"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "f691ca02-1080-4294-bc31-72b2609b0fd2",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "df.createOrReplaceTempView(\"df\")\n",
    "\n",
    "spark.sql(\"select * from df where OrderNumber = 16117\").show()\n",
    "\n",
    "ds.createOrReplaceTempView(\"ds\")\n",
    "\n",
    "spark.sql(\"select * from ds where OrderNumber = 16118\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "9c697375-f1f4-4f22-94b0-4689987d107d",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 不同的地方"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "3b45d54d-fd8e-4759-80b0-b8cb0d3163cc",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### 数据类型的不同"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "e67e0571-ef8d-49a5-8d43-c547af6e9ae2",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "val ds = spark.range(1, 10, 2)\n",
    "\n",
    "val df = ds.toDF()\n",
    "\n",
    "val df1 = df.first()\n",
    "\n",
    "val ds1 = ds.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "5e952d9a-1f17-44a5-81b5-8e70a57ba50a",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "case class Person(name: String, age: Int)\n",
    "\n",
    "val ds = spark.createDataset(Seq(Person(\"wux\",10),Person(\"labs\",20)))\n",
    "\n",
    "val df = ds.toDF()\n",
    "\n",
    "val df1 = df.first()\n",
    "\n",
    "val ds1 = ds.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "12c47628-db94-4256-8b12-4400542650f2",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "val rdd = sc.textFile(\"/mnt/databrickscontainer1/restaurant-1-orders.csv\")\n",
    "\n",
    "val ds = rdd.toDS\n",
    "\n",
    "val df = ds.toDF()\n",
    "\n",
    "val df1 = df.first()\n",
    "\n",
    "val ds1 = ds.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "ef1c8838-ec4c-4add-a2ad-2d9191071094",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "case class Order (\n",
    "val OrderNumber: String,\n",
    "val OrderDate: String,\n",
    "val ItemName: String,\n",
    "val Quantity: Integer,\n",
    "val ProductPrice: Double,\n",
    "val TotalProducts: Integer\n",
    ")\n",
    "\n",
    "val df = spark.read.parquet(\"/mnt/databrickscontainer1/restaurant-1-orders.parquet\")\n",
    "\n",
    "val ds = df.as[Order]\n",
    "\n",
    "val df1 = df.first()\n",
    "\n",
    "val ds1 = ds.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "55a3597c-95cc-454d-aab6-db75388e4879",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "df.map(x => x.getClass().getName()).show(3, false)\n",
    "\n",
    "ds.map(x => x.getClass().getName()).show(3, false)\n",
    "\n",
    "ds.toDF().map(x => x.getClass().getName()).show(3, false)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "ff77bc81-f852-4314-acc6-84f4ad19884d",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### 操作方式的不同"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "dcff94c9-8b71-4039-b149-ba22edad6306",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "df.printSchema()\n",
    "df.map(x => x.getString(2)).show(5)\n",
    "\n",
    "ds.printSchema()\n",
    "ds.map(x => x.ItemName).show(5)\n",
    "\n",
    "// df.map(x => x.ItemName).show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "927b5ae3-9801-4626-8051-b8a828884453",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### DataSet提供了编译时类型检查"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "cb00ee17-931f-4db7-8537-a942888227d1",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 相互转换\n",
    "\n",
    "DataFrame和DataSet之间可以相互转换。\n",
    "\n",
    "\n",
    "**DataFrame转为DataSet**\n",
    "\n",
    "df.as[ElementType] 这样可以把DataFrame转化为DataSet。\n",
    "\n",
    "**DataSet转为DataFrame**\n",
    "\n",
    "ds.toDF() 这样可以把DataSet转化为DataFrame。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "317b9fc6-cb86-4041-a8c7-865290cb5d0f",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "case class Order (\n",
    "val OrderNumber: String,\n",
    "val OrderDate: String,\n",
    "val ItemName: String,\n",
    "val Quantity: Integer,\n",
    "val ProductPrice: Double,\n",
    "val TotalProducts: Integer\n",
    ")\n",
    "\n",
    "val df1 = spark.read.parquet(\"/mnt/databrickscontainer1/restaurant-1-orders.parquet\")\n",
    "\n",
    "val ds1 = df1.as[Order]\n",
    "\n",
    "val df2 = ds1.toDF()"
   ]
  }
 ],
 "metadata": {
  "application/vnd.databricks.v1+notebook": {
   "dashboards": [],
   "language": "python",
   "notebookMetadata": {
    "pythonIndentUnit": 4
   },
   "notebookName": "DataSet介绍",
   "notebookOrigID": 2180139033493830,
   "widgets": {}
  },
  "kernelspec": {
   "display_name": "",
   "name": ""
  },
  "language_info": {
   "name": ""
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
